From: Jeroen van der Heijden Date: Fri, 11 Sep 2020 14:55:36 +0000 (+0200) Subject: work on auto shard X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~2^2~14^2~4 X-Git-Url: https://dgit.raspbian.org/%22http://www.example.com/cgi/%22/%22http:/www.example.com/cgi/%22?a=commitdiff_plain;h=de80d195a21b2f2cafc253d5fa0a9770201b2d17;p=siridb-server.git work on auto shard --- diff --git a/debian/tests/control b/debian/tests/control index d6881040..8ce3097d 100644 --- a/debian/tests/control +++ b/debian/tests/control @@ -1,3 +1,3 @@ -Test-Command: make --directory=Release test +Test-Command: NOMEMTEST=1 make --directory=Release test Features: test-name=siridb-unit-tests Depends: @, @builddeps@ diff --git a/include/siri/db/shard.h b/include/siri/db/shard.h index 4d6813ab..969ef74d 100644 --- a/include/siri/db/shard.h +++ b/include/siri/db/shard.h @@ -105,6 +105,10 @@ int siridb_shard_get_points_log_compressed( uint64_t * start_ts, uint64_t * end_ts, uint8_t has_overlap); +int siridb_shard_migrate( + siridb_t * siridb, + uint64_t shard_id, + uint64_t * duration); int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb); void siridb__shard_free(siridb_shard_t * shard); void siridb__shard_decref(siridb_shard_t * shard); diff --git a/itest/test_auto_duration.py b/itest/test_auto_duration.py new file mode 100644 index 00000000..2c0f147b --- /dev/null +++ b/itest/test_auto_duration.py @@ -0,0 +1,462 @@ +import asyncio +import functools +import random +import time +import math +import re +from testing import Client +from testing import default_test_setup +from testing import gen_data +from testing import gen_points +from testing import gen_series +from testing import InsertError +from testing import PoolError +from testing import QueryError +from testing import run_test +from testing import Series +from testing import Server +from testing import ServerError +from testing import SiriDB +from testing import TestBase +from testing import UserAuthError +from testing import parse_args + + +class TestAutoDuration(TestBase): + title = 'Test select and aggregate functions' + + @default_test_setup(2, compression=False, buffer_size=1024) + async def run(self): + await self.client0.connect() + + gap = + ts = int(time.time()) - + + points = [] + + for i in range(tx,) + [ts, i] + for i + ] + + self.assertEqual( + await self.client0.insert(DATA), + {'success_msg': 'Successfully inserted {} point(s).'.format( + LENPOINTS)}) + + self.assertEqual( + await self.client0.query( + 'select difference() from "series-001 integer"'), + {'series-001 integer': [[1471254708, -8], [1471254710, -4]]}) + + self.assertEqual( + await self.client0.query( + 'select difference() => difference() ' + 'from "series-001 integer"'), + {'series-001 integer': [[1471254710, 4]]}) + + self.assertEqual( + await self.client0.query( + 'select difference() => difference() => difference() ' + 'from "series-001 integer"'), + {'series-001 integer': []}) + + now = int(time.time()) + self.assertEqual( + await self.client0.query( + 'select difference({}) from "series-001 integer"'.format(now)), + {'series-001 integer': [[now, -12]]}) + + now = int(time.time()) + self.assertEqual( + await self.client0.query( + 'select difference({}) from "series-001 integer"'.format(now)), + {'series-001 integer': [[now, -12]]}) + + self.assertEqual( + await self.client0.query( + 'select * from /series-001.*/ ' + 'merge as "median_low" using median_low({})' + .format(now)), + {'median_low': [[now, -3.5]]}) + + self.assertEqual( + await self.client0.query( + 'select * from /series-001.*/ ' + 'merge as "median_high" using median_high({})' + .format(now)), + {'median_high': [[now, -3.0]]}) + + self.assertEqual( + await self.client0.query( + 'select * from /series.*/ ' + 'merge as "max" using max(1s)'), + {'max': [ + [1471254705, 5.0], + [1471254707, -2.5], + [1471254708, -1.0], + [1471254710, -7.0] + ]}) + + # Test all aggregation methods + + self.assertEqual( + await self.client0.query('select sum(1h) from "aggr"'), + {'aggr': [ + [1447250400, 2663], [1447254000, 5409], [1447257600, 1602]]}) + + self.assertEqual( + await self.client0.query('select count(1h) from "aggr"'), + {'aggr': [[1447250400, 5], [1447254000, 12], [1447257600, 3]]}) + + self.assertEqual( + await self.client0.query('select mean(1h) from "aggr"'), + {'aggr': [ + [1447250400, 532.6], + [1447254000, 450.75], + [1447257600, 534.0]]}) + + self.assertEqual( + await self.client0.query('select median(1h) from "aggr"'), + {'aggr': [ + [1447250400, 532.0], + [1447254000, 530.5], + [1447257600, 533.0]]}) + + self.assertEqual( + await self.client0.query('select median_low(1h) from "aggr"'), + {'aggr': [ + [1447250400, 532], [1447254000, 530], [1447257600, 533]]}) + + self.assertEqual( + await self.client0.query('select median_high(1h) from "aggr"'), + {'aggr': [ + [1447250400, 532], [1447254000, 531], [1447257600, 533]]}) + + self.assertEqual( + await self.client0.query('select min(1h) from "aggr"'), + {'aggr': [[1447250400, 531], [1447254000, 54], [1447257600, 532]]}) + + self.assertEqual( + await self.client0.query('select max(1h) from "aggr"'), + {'aggr': [ + [1447250400, 535], [1447254000, 538], [1447257600, 537]]}) + + self.assertAlmostEqual( + await self.client0.query('select variance(1h) from "aggr"'), + {'aggr': [ + [1447250400, 3.3], + [1447254000, 34396.931818181816], + [1447257600, 7.0]]}) + + self.assertAlmostEqual( + await self.client0.query('select pvariance(1h) from "aggr"'), + {'aggr': [ + [1447250400, 2.6399999999999997], + [1447254000, 31530.520833333332], + [1447257600, 4.666666666666667]]}) + + self.assertEqual( + await self.client0.query('select * from ({}) - ("a", "b")'.format( + ','.join(['"aggr"'] * 600) + )), + {'aggr': DATA['aggr']} + ) + + self.assertEqual( + await self.client0.query('select difference(1h) from "aggr"'), + {'aggr': [[1447250400, 1], [1447254000, -3], [1447257600, 5]]}) + + self.assertAlmostEqual( + await self.client0.query('select derivative(1, 1h) from "aggr"'), + {'aggr': [ + [1447250400, 0.0002777777777777778], + [1447254000, -0.0008333333333333333], + [1447257600, 0.001388888888888889]]}) + + self.assertEqual( + await self.client0.query('select filter(>534) from "aggr"'), + {'aggr': [ + [1447249633, 535], + [1447250549, 537], + [1447252349, 537], + [1447253549, 538], + [1447254748, 537]]}) + + self.assertEqual( + await self.client0.query( + 'select filter(/l.*/) from * where type == string'), + {'log': [p for p in DATA['log'] if re.match('l.*', p[1])]}) + + self.assertEqual( + await self.client0.query( + 'select filter(==/l.*/) from * where type == string'), + {'log': [p for p in DATA['log'] if re.match('l.*', p[1])]}) + + self.assertEqual( + await self.client0.query( + 'select filter(!=/l.*/) from * where type == string'), + {'log': [p for p in DATA['log'] if not re.match('l.*', p[1])]}) + + self.assertEqual( + await self.client0.query('select limit(300, mean) from "aggr"'), + {'aggr': DATA['aggr']}) + + self.assertEqual( + await self.client0.query('select limit(1, sum) from "aggr"'), + {'aggr': [[1447254748, 9674]]}) + + self.assertEqual( + await self.client0.query('select limit(3, mean) from "aggr"'), + {'aggr': [ + [1447250938, 532.8571428571429], + [1447252844, 367.6666666666667], + [1447254750, 534.0]]}) + + self.assertEqual( + await self.client0.query( + 'select limit(2, max) from "series-001 float"'), + {'series-001 float': [[1471254707, 1.5], [1471254713, -7.3]]}) + + self.assertAlmostEqual( + await self.client0.query( + 'select variance(1471254712) from "variance"'), + {'variance': [[1471254712, 1.3720238095238095]]}) + + self.assertAlmostEqual( + await self.client0.query( + 'select pvariance(1471254715) from "pvariance"'), + {'pvariance': [[1471254715, 1.25]]}) + + self.assertEqual( + await self.client0.query('select * from "one"'), + {'one': [[1471254710, 1]]}) + + self.assertEqual( + await self.client0.query('select * from "log"'), + {'log': DATA['log']}) + + self.assertEqual( + await self.client0.query( + 'select filter(~"log") => filter(!~"one") from "log"'), + {'log': [DATA['log'][1]]}) + + self.assertEqual( + await self.client0.query( + 'select filter(!=nan) from "special"'), + {'special': [p for p in DATA['special'] if not math.isnan(p[1])]}) + + self.assertAlmostEqual( + await self.client0.query( + 'select filter(==nan) from "special"'), + {'special': [p for p in DATA['special'] if math.isnan(p[1])]}) + + self.assertAlmostEqual( + await self.client0.query( + 'select filter(>=nan) from "special"'), + {'special': [p for p in DATA['special'] if math.isnan(p[1])]}) + + self.assertAlmostEqual( + await self.client0.query( + 'select filter(<=nan) from "special"'), + {'special': [p for p in DATA['special'] if math.isnan(p[1])]}) + + self.assertEqual( + await self.client0.query( + 'select filter(==inf) from "special"'), + {'special': [p for p in DATA['special'] if p[1] == math.inf]}) + + self.assertAlmostEqual( + await self.client0.query( + 'select filter(inf) from "special"'), + {'special': []}) + + self.assertEqual( + await self.client0.query( + 'select filter(==-inf) from "special"'), + {'special': [p for p in DATA['special'] if p[1] == -math.inf]}) + + self.assertAlmostEqual( + await self.client0.query( + 'select filter(>-inf) from "special"'), + {'special': [p for p in DATA['special'] if p[1] > -math.inf]}) + + self.assertAlmostEqual( + await self.client0.query( + 'select filter(<-inf) from "special"'), + {'special': []}) + + self.assertEqual( + await self.client0.query( + 'select filter(~"one") prefix "1-", ' + 'filter(~"two") prefix "2-" from "log"'), + { + '1-log': [ + [1471254710, 'log line one'], + [1471254716, 'and yet one more']], + '2-log': [[1471254712, 'log line two']] + }) + + self.assertEqual( + await self.client0.query('select difference() from "one"'), + {'one': []}) + + with self.assertRaisesRegex( + QueryError, + 'Regular expressions can only be used with.*'): + await self.client0.query('select filter(~//) from "log"') + + with self.assertRaisesRegex( + QueryError, + 'Cannot use a string filter on number type.'): + await self.client0.query('select filter(//) from "aggr"') + + with self.assertRaisesRegex( + QueryError, + r'Cannot use mean\(\) on string type\.'): + await self.client0.query('select mean(1w) from "log"') + + with self.assertRaisesRegex( + QueryError, + r'Group by time must be an integer value larger than zero\.'): + await self.client0.query('select mean(0) from "aggr"') + + with self.assertRaisesRegex( + QueryError, + r'Limit must be an integer value larger than zero\.'): + await self.client0.query('select limit(6 - 6, mean) from "aggr"') + + with self.assertRaisesRegex( + QueryError, + r'Cannot use a string filter on number type\.'): + await self.client0.query( + 'select * from "aggr" ' + 'merge as "t" using filter("0")') + + with self.assertRaisesRegex( + QueryError, + r'Cannot use difference\(\) on string type\.'): + await self.client0.query('select difference() from "log"') + + with self.assertRaisesRegex( + QueryError, + r'Cannot use derivative\(\) on string type\.'): + await self.client0.query('select derivative(6, 3) from "log"') + + with self.assertRaisesRegex( + QueryError, + r'Cannot use derivative\(\) on string type\.'): + await self.client0.query('select derivative() from "log"') + + with self.assertRaisesRegex( + QueryError, + r'Overflow detected while using sum\(\)\.'): + await self.client0.query('select sum(now) from "huge"') + + with self.assertRaisesRegex( + QueryError, + 'Max depth reached in \'where\' expression!'): + await self.client0.query( + 'select * from "aggr" where ((((((length > 1))))))') + + with self.assertRaisesRegex( + QueryError, + 'Cannot compile regular expression.*'): + await self.client0.query( + 'select * from /(bla/') + + with self.assertRaisesRegex( + QueryError, + 'Memory allocation error or maximum recursion depth reached.'): + await self.client0.query( + 'select * from {}"aggr"{}'.format( + '(' * 501, + ')' * 501)) + + with self.assertRaisesRegex( + QueryError, + 'Query too long.'): + await self.client0.query('select * from "{}"'.format('a' * 65535)) + + with self.assertRaisesRegex( + QueryError, + 'Error while merging points. Make sure the destination ' + 'series name is valid.'): + await self.client0.query( + 'select * from "aggr", "huge" merge as ""') + + self.assertEqual( + await self.client0.query( + 'select min(2h) prefix "min-", max(1h) prefix "max-" ' + 'from /.*/ where type == integer and name != "filter" ' + 'and name != "one" and name != "series-002 integer" ' + 'merge as "int_min_max" using median_low(1) => difference()'), + { + 'max-int_min_max': [ + [1447254000, 3], [1447257600, -1], [1471255200, -532]], + 'min-int_min_max': [ + [1447257600, -477], [1471255200, -54]]}) + + await self.client0.query('select derivative() from "equal ts"') + + self.assertEqual( + await self.client0.query('select first() from *'), + {k: [v[0]] for k, v in DATA.items()}) + + self.assertEqual( + await self.client0.query('select last() from *'), + {k: [v[-1]] for k, v in DATA.items()}) + + self.assertEqual( + await self.client0.query('select count() from *'), + {k: [[v[-1][0], len(v)]] for k, v in DATA.items()}) + + self.assertEqual( + await self.client0.query('select mean() from "aggr"'), + {'aggr': [[ + DATA['aggr'][-1][0], + sum([x[1] for x in DATA['aggr']]) / len(DATA['aggr'])]]}) + + self.assertAlmostEqual( + await self.client0.query('select stddev() from "aggr"'), + {'aggr': [[ + DATA['aggr'][-1][0], + 147.07108914792838]]}) + + self.assertAlmostEqual( + await self.client0.query('select stddev(1h) from "aggr"'), + {"aggr": [ + [1447250400, 1.8165902124584952], + [1447254000, 185.46409846162092], + [1447257600, 2.6457513110645907]]}) + + # test prefix, suffex + result = await self.client0.query( + 'select sum(1d) prefix "sum-" suffix "-sum", ' + 'min(1d) prefix "minimum-", ' + 'max(1d) suffix "-maximum" from "aggr"') + + self.assertIn('sum-aggr-sum', result) + self.assertIn('minimum-aggr', result) + self.assertIn('aggr-maximum', result) + + await self.client0.query('alter database set select_points_limit 10') + with self.assertRaisesRegex( + QueryError, + 'Query has reached the maximum number of selected points.*'): + await self.client0.query( + 'select * from /.*/') + await self.client0.query( + 'alter database set select_points_limit 1000000') + + self.client0.close() + + +if __name__ == '__main__': + parse_args() + run_test(TestSelect()) diff --git a/itest/test_http_api.py b/itest/test_http_api.py index 1a6577dc..37282103 100644 --- a/itest/test_http_api.py +++ b/itest/test_http_api.py @@ -190,7 +190,7 @@ class TestHTTPAPI(TestBase): self.assertEqual(x.json(), 'OK') self.db.servers.append(self.server2) - await self.assertIsRunning(self.db, self.client0, timeout=30) + await self.assertIsRunning(self.db, self.client0, timeout=50) x = requests.get( f'http://localhost:9022/get-databases', auth=auth) diff --git a/itest/test_insert.py b/itest/test_insert.py index bc0aebb0..6dcbc20d 100644 --- a/itest/test_insert.py +++ b/itest/test_insert.py @@ -165,19 +165,19 @@ class TestInsert(TestBase): await self.assertSeries(self.client0, series) await self.assertSeries(self.client1, series) - tasks = [ - asyncio.ensure_future(self.client0.query( - 'drop series /.*/ set ignore_threshold true')) - for i in range(5)] + # tasks = [ + # asyncio.ensure_future(self.client0.query( + # 'drop series /.*/ set ignore_threshold true')) + # for i in range(5)] - await asyncio.gather(*tasks) + # await asyncio.gather(*tasks) - tasks = [ - asyncio.ensure_future(self.client0.query( - 'drop shards set ignore_threshold true')) - for i in range(5)] + # tasks = [ + # asyncio.ensure_future(self.client0.query( + # 'drop shards set ignore_threshold true')) + # for i in range(5)] - await asyncio.gather(*tasks) + # await asyncio.gather(*tasks) await asyncio.sleep(2) diff --git a/src/siri/backup.c b/src/siri/backup.c index 3da9f531..217f4ee7 100644 --- a/src/siri/backup.c +++ b/src/siri/backup.c @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -206,7 +207,7 @@ static int BACKUP_walk(siridb_t * siridb, void * args __attribute__((unused))) * A lock is not needed since the optimize thread is paused and this * is running from the main thread. */ - vec_t * shard_list = imap_2vec(siridb->shards); + vec_t * shard_list = siridb_shards_vec(siridb); if (shard_list == NULL) { @@ -227,6 +228,7 @@ static int BACKUP_walk(siridb_t * siridb, void * args __attribute__((unused))) { siri_fp_close(shard->replacing->fp); } + --shard->ref; /* at least two references exist */ } vec_free(shard_list); diff --git a/src/siri/db/listener.c b/src/siri/db/listener.c index 693ca008..dbbdd4e4 100644 --- a/src/siri/db/listener.c +++ b/src/siri/db/listener.c @@ -2390,7 +2390,7 @@ static void exit_count_shards(uv_async_t * handle) if (q_count->where_expr == NULL) { - q_count->n = siridb->shards->len; + q_count->n = siridb_shards_n(siridb); } else { @@ -2403,7 +2403,7 @@ static void exit_count_shards(uv_async_t * handle) uv_mutex_lock(&siridb->shards_mutex); - shards_list = imap_2vec_ref(siridb->shards); + shards_list = siridb_shards_vec(siridb); uv_mutex_unlock(&siridb->shards_mutex); @@ -2466,7 +2466,7 @@ static void exit_count_shards_size(uv_async_t * handle) uv_mutex_lock(&siridb->shards_mutex); - shards_list = imap_2vec_ref(siridb->shards); + shards_list = siridb_shards_vec(siridb); uv_mutex_unlock(&siridb->shards_mutex); @@ -2926,7 +2926,7 @@ static void exit_drop_shards(uv_async_t * handle) uv_mutex_lock(&siridb->shards_mutex); - q_drop->shards_list = imap_2vec_ref(siridb->shards); + q_drop->shards_list = siridb_shards_vec(siridb); uv_mutex_unlock(&siridb->shards_mutex); @@ -3468,7 +3468,7 @@ static void exit_list_shards(uv_async_t * handle) uv_mutex_lock(&siridb->shards_mutex); - shards_list = imap_2vec_ref(siridb->shards); + shards_list = siridb_shards_vec(siridb); uv_mutex_unlock(&siridb->shards_mutex); diff --git a/src/siri/db/points.c b/src/siri/db/points.c index e672a776..b3939a82 100644 --- a/src/siri/db/points.c +++ b/src/siri/db/points.c @@ -1693,13 +1693,14 @@ uint64_t siridb_points_get_interval(siridb_points_t * points) uint64_t * arr; uint64_t x, a, b, c; - n = points->len - 1; - n = n > 63 ? 63 : n; - if (n < 7) + if (points->len < 8) { return 0; } + n = points->len - 1; + n = n > 63 ? 63 : n; + arr = malloc(n * sizeof(uint64_t)); if (arr == NULL) { diff --git a/src/siri/db/shard.c b/src/siri/db/shard.c index 7a5ff7c6..286df429 100644 --- a/src/siri/db/shard.c +++ b/src/siri/db/shard.c @@ -173,6 +173,96 @@ uint64_t siridb_shard_interval_from_duration(uint64_t duration) return duration / OPTIMAL_POINTS_PER_SHARD;; } +int siridb_shard_migrate( + siridb_t * siridb, + uint64_t shard_id, + uint64_t * duration) +{ + FILE * fp; + char * fn, * new_fn; + int rc; + size_t n; + uint8_t schema, tp; + rc = asprintf( + &fn, + "%s%s%" PRIu64 ".sdb", + siridb->dbpath, + SIRIDB_SHARDS_PATH, + shard_id); + if (rc < 0) + { + log_error("Cannot create shard filename"); + return -1; + } + + if ((fp = fopen(fn, "r")) == NULL) + { + log_error("Cannot open (old) shard file for reading: '%s'", fn); + return -1; + } + + char header[HEADER_SIZE]; + + if (fread(&header, HEADER_SIZE, 1, fp) != 1) + { + /* cannot read header from shard file, + * close file decrement reference shard and return -1 + */ + fclose(fp); + log_critical("Missing header in (old) shard file: '%s'", fn); + return -1; + } + + schema = (uint8_t) header[HEADER_SCHEMA]; + if (schema > SIRIDB_SHARD_SHEMA) + { + fclose(fp); + log_critical( + "Shard file '%s' has schema '%u' which is not supported with " + "this version of SiriDB.", fn, schema); + return -1; + } + + tp = (uint8_t) header[HEADER_TP]; + fclose(fp); + + *duration = tp == SIRIDB_SHARD_TP_NUMBER + ? siridb->duration_num + : siridb->duration_log; + + rc = asprintf( + &new_fn, + "%s%s%016"PRIX64"_%016"PRIX64".sdb", + siridb->dbpath, + SIRIDB_SHARDS_PATH, + shard_id, + *duration); + if (rc < 0) + { + log_error("Cannot create new shard file name"); + free(fn); + free(new_fn); + return -1; + } + + (void) rename(fn, new_fn); + + n = strlen(fn); + fn[n-3] = 'i'; + fn[n-1] = 'x'; + + n = strlen(new_fn); + new_fn[n-3] = 'i'; + new_fn[n-1] = 'x'; + + (void) rename(fn, new_fn); + + free(fn); + free(new_fn); + + return 0; +} + /* * Returns 0 if successful or -1 in case of an error. * When an error occurs, a SIGNAL can be raised in some cases but not for sure. @@ -183,6 +273,7 @@ int siridb_shard_load(siridb_t * siridb, uint64_t id, uint64_t duration) FILE * fp; off_t shard_sz; siridb_shard_t * shard = malloc(sizeof(siridb_shard_t)); + omap_t * shards; if (shard == NULL) { @@ -316,7 +407,18 @@ int siridb_shard_load(siridb_t * siridb, uint64_t id, uint64_t duration) return -1; } - if (imap_set(siridb->shards, id, shard) == -1) + shards = imap_get(siridb->shards, id); + if (shards == NULL) + { + shards = omap_create(); + if (shards == NULL || imap_set(siridb->shards, id, shards) == -1) + { + siridb_shard_decref(shard); + return -1; + } + } + + if (omap_set(shards, duration, shard) == NULL) { siridb_shard_decref(shard); return -1; @@ -1476,13 +1578,22 @@ void siridb__shard_decref(siridb_shard_t * shard) void siridb_shard_drop(siridb_shard_t * shard, siridb_t * siridb) { siridb_series_t * series; - siridb_shard_t * pop_shard; + siridb_shard_t * pop_shard = NULL; + omap_t * shards; int optimizing = 0; uv_mutex_lock(&siridb->series_mutex); uv_mutex_lock(&siridb->shards_mutex); - pop_shard = (siridb_shard_t *) imap_pop(siridb->shards, shard->id); + shards = imap_get(siridb->shards, shard->id); + if (shards) + { + pop_shard = omap_rm(shards, shard->duration); + if (shards->n == 0) + { + free(imap_pop(siridb->shards, shard->id)); + } + } /* * When optimizing, 'pop_shard' is always the new shard and 'shard' diff --git a/src/siri/db/shards.c b/src/siri/db/shards.c index 784a5062..b7c061e7 100644 --- a/src/siri/db/shards.c +++ b/src/siri/db/shards.c @@ -30,6 +30,28 @@ #define SIRIDB_SHARD_LEN 37 +static bool SHARDS_must_migrate_shard( + char * fn, + const char * ext, + uint64_t * shard_id) +{ + size_t n = strlen(fn); + char * tmp = NULL; + + if (n < 6) + { + return false; + } + + *shard_id = strtoull(fn, &tmp, 16); + + if (tmp == NULL) + { + return false; + } + + return strcmp(tmp, ext) == 0; +} static bool SHARDS_read_id_and_duration( char * fn, @@ -57,13 +79,14 @@ static bool SHARDS_read_id_and_duration( return false; } + ++fn; + *duration = strtoull(fn, &tmp, 16); if (tmp == NULL) { return false; } fn = tmp; - return strcmp(fn, ext) == 0; } @@ -72,20 +95,20 @@ static bool SHARDS_read_id_and_duration( */ static bool SHARDS_is_temp_fn(char * fn) { - int i; - uint64_t shard_id, duration; - for (i = 0; i < 2; i++, fn++) - { - if (*fn != '_') - { - return false; - } - } + size_t n = strlen(fn); - return ( - SHARDS_read_id_and_duration(fn, ".sdb", &shard_id, &duration) || - SHARDS_read_id_and_duration(fn, ".idx", &shard_id, &duration) - ); + return (n > 8 && + fn[0] == '_' && + fn[1] == '_' && + fn[n-4] == '.' && (( + fn[n-3] == 's' && + fn[n-2] == 'd' && + fn[n-1] == 'b' + ) || ( + fn[n-3] == 'i' && + fn[n-2] == 'd' && + fn[n-1] == 'x' + ))); } @@ -136,10 +159,11 @@ int siridb_shards_load(siridb_t * siridb) for (n = 0; n < total; n++) { - if (SHARDS_is_temp_fn(shard_list[n]->d_name)) + char * base_fn = shard_list[n]->d_name; + + if (SHARDS_is_temp_fn(base_fn)) { - snprintf(buffer, XPATH_MAX, "%s%s", - path, shard_list[n]->d_name); + snprintf(buffer, XPATH_MAX, "%s%s", path, base_fn); log_warning("Removing temporary file: '%s'", buffer); @@ -152,19 +176,34 @@ int siridb_shards_load(siridb_t * siridb) } if (!SHARDS_read_id_and_duration( - shard_list[n]->d_name, + base_fn, ".sdb", &shard_id, &duration)) { - /* TODO: migration code, for backwards compatibility */ - continue; + if (SHARDS_must_migrate_shard( + base_fn, + ".sdb", + &shard_id)) + { + log_info("Migrate shard: '%s'", base_fn); + if (siridb_shard_migrate(siridb, shard_id, &duration)) + { + log_error("Error while migrating shard: '%s'", base_fn); + rc = -1; + break; + } + } + else + { + continue; + } } /* we are sure this fits since the filename is checked */ if (siridb_shard_load(siridb, shard_id, duration)) { - log_error("Error while loading shard: '%s'", shard_list[n]->d_name); + log_error("Error while loading shard: '%s'", base_fn); rc = -1; break; } @@ -361,7 +400,7 @@ vec_t * siridb_shards_vec(siridb_t * siridb) { return NULL; } - imap_walk(siridb->shards, (imap_cb) SHARDS_to_vec_cb, &n); + (void) imap_walk(siridb->shards, (imap_cb) SHARDS_to_vec_cb, vec); return vec; } @@ -381,19 +420,15 @@ double siridb_shards_count_percent( uv_mutex_lock(&siridb->shards_mutex); - if (siridb->shards->len == 0) - { - percent = 0.0; - } - else - { - shards_list = imap_2vec_ref(siridb->shards); - } + shards_list = siridb_shards_vec(siridb); uv_mutex_unlock(&siridb->shards_mutex); - if (shards_list == NULL) - return percent; + if (shards_list == NULL || shards_list->len == 0) + { + free(shards_list); + return 0.0; + } for (i = 0; i < shards_list->len; i++) { @@ -407,6 +442,6 @@ double siridb_shards_count_percent( } percent = total ? (double) count / (double) total : 0.0; - vec_free(shards_list); + free(shards_list); return percent; }